Hadoop Learning

Tao Zou

2024-10-04

Hadoop 组成

Hadoop 用来存储分析海量数据,其组件为:

  • HDFS: 负责数据存储。一个NameNode管理多个DataNode,Second NameNode负责辅助NameNode。

  • Yarn: 负责资源调度。一个ResourceManager(RM)管理多个NodeManager。RM管理整个集群的内存和cpu。每个NodeManager上可以有多个container,用于运行任务。如果一个NodeManager上有2个cpu,则这个NM上最多可以同时运行2个container。

  • MapReduce: 负责计算。Map阶段并行处理输入数据,Reduce阶段汇总结果。MapReduce好像用于离线计算,Spark可以用于内存计算。MapReduce的上层有Hive支持,Spark的上层有Spark MLlib、Spark SQL、Spark Streaming等支持。

root用户相关

设置用户密码:

sudo passwd root

切换到root用户,输入下述命令,然后键入密码。

su root

虚拟机组成

硬件

我的联想电脑的CPU核心数为12。虚拟机运行数量 * 每个虚拟机CPU数 * 每个CPU核心数 不要超过12。

IP

设置VMWare的IP地址

VMWare中,点击编辑->虚拟网络编辑器,会看见VMnet1和VMnet8。选中VMnet8,点击更改设置。再次点击编辑->虚拟网络编辑器,会看见VMnet0,VMnet1和VMnet8。选中VMnet8,可以将子网IP第三个八位字节设置为10(如192.168.10.0)。点击NAT设置,将网关IP也同样设置(网关IP的第四个八位字节一般是2)。最后,点击虚拟网络编辑器中的确定。

本机Windows中设置VMnet8的地址

在本机中,点击以太网->更改适配器选项->VMWare Network Adapter VMnet8->属性->Internet协议版本 4。按照下图配置。

虚拟机内部设置IP地址

打开文件。如果识别不出vim命令,则需要下载apt install vim

su root
sudo vim /etc/netplan/01-network-manager-all.yaml

键入i进入编辑模式。

network:
  version: 2
  renderer: networkd
  ethernets:
    ens33:
      dhcp4: no
      addresses:
        - 192.168.10.100/24
      gateway4: 192.168.10.2
      nameservers:
        addresses:
          - 192.168.10.2

Esc,再键入:wq可以保存修改的内容。

应用更改:

sudo netplan apply

修改主机名称:

vim /etc/hostname

建立IP地址与主机名的映射关系:

sudo vim /etc/hosts

增加下述内容:

192.168.10.100 hadoop100
192.168.10.101 hadoop101
192.168.10.102 hadoop102
192.168.10.103 hadoop103
192.168.10.104 hadoop104
192.168.10.105 hadoop105
192.168.10.106 hadoop106
192.168.10.107 hadoop107
192.168.10.108 hadoop108
  1. 使用ifconfig检查ens33部分的内容,查看inet是否为192.168.10.100

  2. 然后再ping www.baidu.com看看是否能访问外网。

  3. hostname查看主机名称是否为hadoop100

在克隆出的新的虚拟机中,需要修改01-netcfg.yaml中的ens33 的 addresses 为192.168.10.102192.168.10.103等,以及修改主机名即可。

安装Hadoop、JDK

安装JDK

安装JDK需要注意下载操作系统对应的版本,例如64位的Ubantu系统需要对应安装64位的JDK。如果错误地安装JDK版本,操作系统运行java命令会报错找不到对应文件。通过uname -m,如果显示x86_64则说明当前Ubantu系统是64位的。

解压文件,在压缩包路径下运行下述命令,-C参数指定解压路劲。

tar -zxvf jdk-22_linux-x64_bin.tar.gz -C /home/zt/development/JDK17

配置Java环境变量,创建一个环境文件my_env.sh

cd /etc/profile.d
sudo vim my_env.sh

编写Java环境变量

#JAVA_HOME
export JAVA_HOME=/home/zt/development/JDK17/jdk-22.0.1
export PATH=$PATH:$JAVA_HOME/bin

刷新

source /etc/profile
java -version
# 查看Ubantu机环境变量
echo $PATH

安装Hadoop

参照安装JDK,解压->打开my_env.sh,编写环境:

#HADOOP_HOME
export HADOOP_HOME=/home/zt/development/Hadoop336/hadoop-3.3.6
export PATH=$PATH:$HADOOP_HOME/bin
export PATH=$PATH:$HADOOP_HOME/sbin
source /etc/profile

Hadoop准备

Hadoop重要文件结构

运行下述命令查看Hadoop命令。

cd ~/Hadoop336/hadoop-3.3.6/bin
ls

运行下述命令查看配置文件。

cd ~/Hadoop336/hadoop-3.3.6/etc/hadoop
ls

运行下述命令,查看集群配件相关启动命令。

cd ~/Hadoop336/hadoop-3.3.6/sbin
ls

Hadoop本地模式

新建一个.txt文档如下:

aa bb
cc aa

运行下述命令(我在hadoop-3.3.6目录下运行的):

hadoop jar share/hadoop/mapreduce/hadoop-mapreduce-examples-3.3.6.jar wordcount myinput/word.txt myoutput

输出文件会被保存在新创建的文件夹myoutput中,myoutput文件夹须由hadoop创建,该文件夹不能预先存在。

Hadoop节点文件同步

scp复制文件到其他节点

使用scp命令复制文件到其他节点上。但是这个命令不能复制文件夹到其他节点上。

# 从hadoop102拷贝jdk-22.0.1到hadoop103相同目录下
scp -r /home/zt/development/JDK17/jdk-22.0.1 zt@hadoop103:/home/zt/development/JDK17
# 从hadoop103拉取hadoop102的hadoop-3.3.6
scp -r zt@hadoop102:/home/zt/development/Hadoop336/hadoop-3.3.6 ./
# 在hadoop103执行命令将hadoop102的jdk-22.0.1和hadoop-3.3.6拷贝到hadoop104上
scp -r zt@hadoop102:/home/zt/development/* zt@hadoop104:/home/zt/development

上述代码在hadoop103将“公钥-私钥”对创建好了发送公钥给hadoop104后,才能够被成功执行。

rsync同步文件夹

rsync -av /home/zt/development/Hadoop336/hadoop-3.3.6 zt@hadoop104:/home/zt/development/hadoop-3.3.6

循环复制文件到所有节点的相同目录下

使用自定义命令。

#在/home/zt/bin目录下(没有则创建mkdir bin)
vim xsync

在xsync中写入:

if [ $# -lt 1]
then 
    echo Not Enough Arguement!
    exit;
fi

for host in hadoop102 hadoop103 hadoop104
do
    echo ============ $host ============
    for file in $@
    do
        if [ -e $file ];
            then
                pdir=$(cd -P $(dirname $file); pwd)
                fname=$(basename $file)
                ssh $host "mkdir -p $pdir"
                rsync -av $pdir/$fname $host:$pdir
            else
                echo $file does not exists!
        fi
    done
done

然后执行chmod 777 xsync

然后在/home/zt目录下运行:bin/xsync bin/即可将bin文件夹及其内容同步到hadoop103和hadoop104的/home/zt中了。

注意该命令为用户自建命令。如果在root用户下执行该命令时要给该命令加上绝对路径,让root用户能找到该命令。

SSH免密登录

在hadoop103上创建一个“公钥-私钥对”。将公钥发送给hadoop104。当hadoop103发送用私钥加密的数据到hadoop104时,hadoop104会通过hadoop103的公钥将数据解密。hadoop104要给hadoop103发送响应时,也会使用hadoop103的公钥加密数据,hadoop103接收数据后通过私钥解密。

ls -al可以查看到所有的隐藏文件包括/home/zt下的.ssh文件。

# 在.ssh目录下执行下述命令,之后输入三次回车,会在.ssh目录下生成id_rsa私钥和id_rsa.pub公钥两个文件
ssh-keygen -t rsa
# 通过下述命令可以将本地的公钥和密钥发送到hadoop103上,之后即可通过ssh hadoop103对hadoop 103无密访问。
ssh-copy-id hadoop103

.ssh/authorized_keys中可以看到哪些节点可以无密访问本节点。

注意“公钥-私钥”对对于每一个用户都是不同的。比如在zt用户下配置了本节点的公钥-密钥且传送到了hadoop104,但是切换到root用户后,依然不能对hadoop104进行无密访问。

hadoop102也需要自己对自己设置无密访问。

配置Hadoop集群

目标配置情况

hadoop102 hadoop103 hadoop104
HDFS NameNode, DataNode DataNode SecondaryNameNode, DataNode
YARN NodeManager ResourceManager, NodeManager NodeManager

寻找默认配置文件:

  1. ~/Hadoop336/hadoop-3.3.6/share/hadoop/common/中执行unzip hadoop-common-3.3.6.jar,可以在相同文件夹下得到文件core-default.xml

  2. ~/Hadoop336/hadoop-3.3.6/share/hadoop/hdfs/中执行unzip hadoop-hdfs-3.3.6.jar,可以在相同文件夹下得到文件hdfs-default.xml

  3. ~/Hadoop336/hadoop-3.3.6/share/hadoop/mapreduce/中执行unzip hadoop-mapreduce-client-core-3.3.6.jar,可以在相同文件夹下得到文件mapred-default.xml

  4. ~/Hadoop336/hadoop-3.3.6/share/hadoop/yarn/中执行unzip hadoop-yarn-common-3.3.6.jar,可以在相同文件夹下得到文件yarn-default.xml

可用户自定义的配置文件存在于$HADOOP_HOME/etc/hadoop中,分别为:core-site.xml, hdfs-site.xml, mapred-site.xml, yarn-site.xml

修改用户自定义配置

core配置

$HADOOP_HOME/etc/hadoop/core-site.xml

<?xml version="1.0" encoding="UTF-8"?>
<?xml-stylesheet type="text/xsl" href="configuration.xsl"?>

<configuration>
    <!-- 指定NameNode的地址 -->
    <property>
        <name>fs.defaultFS</name>
        <value>hdfs://hadoop102:8020</value>
    </property>
    <!-- 指定hadoop数据的存储目录 -->
    <property>
        <name>hadoop.tmp.dir</name>
        <value>/home/zt/development/Hadoop336/data</value>
    </property>
    <!-- 配置HDFS网页登录使用的静态用户为zt -->
    <property>
        <name>hadoop.http.staticuser.user</name>
        <value>zt</value>
    </property>
</configuration>

HDFS配置

$HADOOP_HOME/etc/hadoop/hdfs-site.xml

<?xml version="1.0" encoding="UTF-8"?>
<?xml-stylesheet type="text/xsl" href="configuration.xsl"?>

<configuration>
    <!-- 暴露NameNode的Web访问地址 -->
    <property>
        <name>dfs.namenode.http-address</name>
        <value>hadoop102:9870</value>
    </property>
    <!-- 暴露Secondary NameNode的Web访问地址 -->
    <property>
        <name>dfs.namenode.secondary.http-address</name>
        <value>hadoop104:9868</value>
    </property>
</configuration>

我把上面的hadoop102:9870给配置成了0.0.0.0:9870用以尝试能否在本地电脑上通过网址http://1.95.72.190:9870访问到华为云服务器上的HDFS的web页面。暂时还没有成功。

YARN配置

$HADOOP_HOME/etc/hadoop/yarn-site.xml

<?xml version="1.0" encoding="UTF-8"?>
<?xml-stylesheet type="text/xsl" href="configuration.xsl"?>

<configuration>
    <!-- 指定MR走Shuffle -->
    <property>
        <name>yarn.nodemanager.aux-services</name>
        <value>mapreduce_shuffle</value>
    </property>
    <!-- 指定ResourceManaer的地址 -->
    <property>
        <name>yarn.resourcemanager.hostname</name>
        <value>hadoop103</value>
    </property>
    <!-- 环境变量的继承 -->
    <property>
        <name>yarn.nodemanager.env-whitelist</name>
        <value>JAVA_HOME,HADOOP_HOME,HADOOP_COMMON_HOME,HADOOP_HDFS_HOME,HADOOP_CONF_DIR,CLASSPATH_PREPEND_DISTCACHE,HADOOP_YARN_HOME,HADOOP_MAPRED_HOME</value>
    </property>
    
</configuration>

MapReduce配置文件

$HADOOP_HOME/etc/hadoop/mapred-site.xml

<?xml version="1.0"?>
<?xml-stylesheet type="text/xsl" href="configuration.xsl"?>

<configuration>
    <!-- 指定MapReduce运行在YARN上 -->
    <property>
        <name>mapreduce.framework.name</name>
        <value>yarn</value>
    </property>
</configuration>

启动集群

集群初始化

$HADOOP_HOME/etc/hadoop/workers中将localhost修改为:

hadoop102
hadoop103
hadoop104

$HADOOP_HOME下执行命令hdfs namenode -format初始化。初始化完成后会在$HADOOP_HOME目录下出现logs文件夹,以及在.xml配置文件中指定的hadoop数据存储目录下生成data文件夹。在这个data文件夹中的data/dfs/name/current/VERSION存放的是版本号:

#Wed Sep 04 09:06:26 EDT 2024
blockpoolID=BP-1667933497-192.168.10.102-1725455186531
cTime=1725455186531
clusterID=CID-95dd7d98-be69-4b43-a791-7adc3ee4b196
layoutVersion=-66
namespaceID=1642564378
storageType=NAME_NODE

启动HDFS

HDFS不允许用root用户启动,为了强制使用root用户进行操作,可以在$HADOOP_HOME/etc/hadoop/hadoop-env.sh中添加下述内容。(不建议这样做)

export HDFS_NAMENODE_USER=root
export HDFS_DATANODE_USER=root
export HDFS_SECONDARYNAMENODE_USER=root

export YARN_NODEMANAGER_USER=root
export YARN_RESOURCEMANAGER_USER=root

节点hadoop102$HADOOP_HOME目录下执行命令sbin/start-dfs.sh可以启动HDFS。Web端hdfs位于http://hadoop102:9870

在指向上面的命名时我出现了一个报错说三个节点上都找不到JAVA_HOME。打开$HADOOP_HOME/etc/hadoop/hadoop-env.sh,追加下述内容即可。

export JAVA_HOME=/home/zt/development/JDK17/jdk-22.0.1

执行jps可以显示正在运行的Java进程。

启动YARN

节点hadoop103$HADOOP_HOME目录下执行命令sbin/start-yarn.sh可以启动YARN。Web端yarn位于http://hadoop103:8088

Java22版本在启动yarn时会出错,尝试使用Java8版本,并更改环境变量my_env.sh和hadoop环境变量hadoop-env.sh后再次启动YARN则不会出现问题。

执行jps可以显示正在运行的Java进程。

测试集群

测试HDFS

# 在hadoop根目录下创建新文件夹
hadoop fs -mkdir /myfolder
# 上传本节点文件到myfolder文件夹
hadoop fs -put /home/zt/mytest.txt /myfolder

上传到hadoop的上述文件实际上是被分成了多个数据块被存储到了每一个DataNode中,每个DataNode都有一份该文件的副本。具体路径为:

Hadoop336/data/dfs/data/current/BP-1433059334-192.168.10.102-1725551956628/current/finalized/subdir0/subdir0

每一个DataNode中都有所上传的mytest.txt文件的完整版。如果是一个大文件,它可能被拆分并存储到几个”blk_…“文件中。

测试YARN

# 在$HADOOP_HOME路径下执行下述命令。
# 确保在/myfolder下有一个用于被统计单词数量的.txt文件
# /myfolderOutput为输出文件夹
hadoop jar share/hadoop/mapreduce/hadoop-mapreduce-examples-3.3.6.jar wordcount /myfolder /myfolderOutput

配置历史服务器和日志聚集

$HADOOP_HOME/etc/hadoop/mapred-site.xml中加入下述property,然后分发到其他节点。通过在$HADOOP_HOME/bin下执行命令mapred -daemon start historyserver运行历史服务器。在http://hadoop102:19888/jobhistory查看历史服务器的Web。

<property>
    <name>mapreduce.jobhistory.address</name>
    <value>hadoop102:10020</value>
</property>
<property>
    <name>mapreduce.jobhistory.webapp.address</name>
    <value>hadoop102:19888</value>
</property>

通过历史服务器可以查看YARN任务的配置参数、任务细节、运行时间、程序运行的日志等。

$HADOOP_HOME/etc/hadoop/mapred-site.xml中加入下述property。然后分发到其他节点。通过这种方式配置日志聚集,使得可以在历史服务器中看到程序的运行日志。

<property>
  <name>yarn.log-aggregation-enable</name>
  <value>true</value>
</property>
<property>
  <name>yarn.log.server.url</name>
  <value>http://hadoop102:19888/jobhistory/logs</value>
</property>
<!--设置日志的保留时间为7天  -->
<property>
  <name>yarn.log-aggregation.retain-seconds</name>
  <value>604800</value>
</property>

集群异常处理

自定义Hadoop集群启停命令

我将之命名为hadoopstartstop.sh。在/home/zt/bin目录下创建好hadoopstartstop.sh后,写入下述代码。之后执行chmod 777 hadoopstartstop.sh。之后使用hadoopstartstop.sh starthadoopstartstop.sh stop即可启停。

#!/bin/bash

if [ $# -lt 1 ]
then 
    echo "No Args Input..."
    exit ;
fi

case $1 in
"start")
        echo "====== 启动hadoop集群 ======"
        echo "------ 启动hdfs ------"
        ssh hadoop102 "/home/zt/development/Hadoop336/hadoop-3.3.6/sbin/start-dfs.sh"
        echo "------ 启动yarn ------"
        ssh hadoop103 "/home/zt/development/Hadoop336/hadoop-3.3.6/sbin/start-yarn.sh"
        echo "------ 启动history server ------"
        ssh hadoop102 "/home/zt/development/Hadoop336/hadoop-3.3.6/bin/mapred --daemon start historyserver"
;;
"stop")
        echo "====== 关闭hadoop集群 ======"
        echo "------ 关闭history server ------"
        ssh hadoop102 "/home/zt/development/Hadoop336/hadoop-3.3.6/bin/mapred --daemon stop historyserver"
        echo "------ 关闭yarn ------"
        ssh hadoop103 "/home/zt/development/Hadoop336/hadoop-3.3.6/sbin/stop-yarn.sh"
        echo "------ 关闭hdfs ------"
        ssh hadoop102 "/home/zt/development/Hadoop336/hadoop-3.3.6/sbin/stop-dfs.sh"
;;
*)
    echo "Input Args Error..."
;;
esac

自定义查看所有节点Java进程情况

/home/zt/bin下创建jpsall,并写入下述信息。然后执行chmod 777 jpsall。直接使用jpsall即可执行。

#!/bin/bash

for host in hadoop102 hadoop103 hadoop104
do
        echo ====== $host ======
        ssh $host /home/zt/development/JDK8/jdk1.8.0_202/bin/jps
done

其他问题的解决方案

VMWare虚拟机间歇性宕机

跟一个网络相关的程序有关

未关闭集群直接关机节点

此操作会可能会造成再次启动hdfs时datanode的clusterID与NameNode的clusterID不匹配。导致后续无法正常启动datanode。

可以在每一个节点上使用rm -rf /home/zt/development/Hadoop336/data/dfs/data/*来使datanode可以被正常启动。

HDFS

HDFS分布式存储系统具有一次写入,多次读出的特点:一个文件经过创建、写入、关闭之后就不再改变,除了追加。

HDFS的文件在物理上是分块存储的(Block1, Block 2, …),一般100MB/S的磁盘传输速率设置块大小为128MB,200-300MB/s的磁盘传输速率设置块大小为256MB。

每一个Block只能属于一个文件,每个文件由一个或多个Block组成。Block的大小并不会被预先分配,例如我上传一个1KB的文件,那么这个文件由实际只占用空间1KB的Block存储。再次上传新的文件时,又会有新的Block来存储新上传的文件。

HDFS缺点:

  1. 不适合低延时的数据访问。

  2. 无法高效地对大量小文件进行存储。因为会占用大量NameNode内存(一般是128G)来存储文件目录。

  3. 不能被多个线程并发地写。

NameNode作用:

  1. 管理HDFS的名称空间。

  2. 配置副本策略。

  3. 管理数据块(Block)的映射信息。

  4. 处理客户端读写请求。

DataNode作用:

  1. 实际存储数据块。

  2. 执行数据块的读写操作。

HDFS的Shell操作

HDFS的基本命令hadoop fs <命令>hdfs dfs <命令>

HDFS上传、下载操作

# 查看某一hadoop命令的帮助文档
hadoop fs -help rm
# 在hadoop的根目录下创建一个名为threekingdoms的空文件夹
hadoop fs -mkdir /threekingdoms
# 将本节点的当前目录下的本地文件剪切上传到HDFS中,本地被上传的文件会被删除
hadoop fs -moveFromLocal ./shukingdom.txt /threekingdoms
# 将本节点的当前目录下的本地文件复制上传到HDFS中,与-put命令效果相同
hadoop fs -copyFromLocal ./shukingdom.txt /threekingdoms
# 追加一个文件到目的文件的末尾
hadoop fs -appendToFile ./jinkingdom.txt /threekingdoms/weikingdom.txt
# 从HDFS拷贝到本地,与-get命令效果相同
hadoop fs -copyToLocal /threekingdoms/shukingdom.txt ./

HDFS常用操作

# 将HDFS中一个文件拷贝到HDFS中的另一个路径中
hadoop fs -cp /myfolder/mytest.txt /threekingdoms
# 将HDFS中一个文件移动到HDFS中的另一个路径中
hadoop fs -mv /myfolder/mytest.txt /threekingdoms
# 递归删除一个目录中所有的内容
hadoop fs -rm -r /threekingdoms
# 将HDFS文件的所有者改为zt:ztgroup,冒号前时owner,冒号后是Group
hadoop fs -chown zt:ztgroup /threekingdoms
# 显示一个文件末尾1KB的数据
hadoop fs -tail /threekingdoms/weikingdom.txt
# 显示HDFS一个文件夹的大小,比如:44 132,44表示在一个datanode上的文件大小,132表示在所有datanode上的文件大小。
hadoop fs -du -s -h /threekingdoms
# 显示HDFS一个文件夹中所有文件和子文件夹的大小
hadoop fs -du -h /threekingdoms
# 设置副本数量为10,10会记录在NameNode元数据中,因为实际只有三个DataNode,因此最多只能有三个副本。不会对物理内存造成影响。
hadoop fs -setrep 10 /threekingdoms

HDFS的API操作

NameNode和SecondaryNameNode的机制

NameNode元数据是存储在运行内存当中的。NameNode的内存为128GB,一个Block占元数据的150B。

  1. NameNode启动,加载编辑日志edits和镜像文件fsimage到运行内存中。

  2. 客户端向NameNode发送增/删/查/改请求。

  3. edits记录操作日志、更新滚动日志。

  4. 增/删/查/改操作完成。

  5. Secondary NameNode请求NameNode是否需要CheckPoint,若需要则执行CheckPoint。CheckPoint的触发条件为:1.定时时间到了;2.Edits中的数据满了。

  6. edits滚动,从edits_inprogress_001变为edits_001,新增edits_inprogress_002。

  7. NameNode中的edits和fsimage被拷贝到Secondary NameNode中并生成fsimage.chkpoint然后拷贝到NameNode中,该fsimage.chkpoint被重命名为fsimage

fsimage和edits

在NameNode被初始化后,在Hadoop336/data/dfs/name/current中会生成下述四个文件:

fsimage_0000000000000000000
fsimage_0000000000000000000.md
seen_txid
VERSION

fimage文件是HDFS元数据中的一个永久性检查点,包含HDFS文件系统的所有目录和文件inode的序列化信息。可以使用下述命令查看每一个fsimage文件。

hdfs oiv -p XML -i fsimage_0000000000000000000 -o /my/path/to/store/myfsimage.xml

edits是存放HDFS所有更新操作的文件,客户端请求的所有操作首先会被记录到edits中。可以使用下述命令查看每一个edits文件。

hdfs oev -p XML -i edits_0000000000000000012-0000000000000000013 -o /my/path/to/store/myedits.xml

seen_txid文件保存的是一个数字,即最后一个edits_的数字。

设置CheckPont默认触发

hdfs-default.xml中添加下述配置,设置NameNode的CheckPoint的默认触发时间是1小时,默认edits满操作次数是10000次,检查edits操作次数的时间间隔为1分钟。

<property>
    <name>dfs.namenode.checkpoint.period</name>
    <value>3600s</value>
</property>
<property>
    <name>dfs.namenode.checkpoint.txns</name>
    <value>10000</value>
</property>
<property>
    <name>dfs.namenode.checkpoint.check.period</name>
    <value>60s</value>
</property>

DataNode工作机制

  1. DataNode在每个Block中存储数据+数据长度+校验和+时间戳

  2. DataNode启动后向NameNode注册,NameNode向DataNode返回注册成功的信息。以后每隔一定时间DataNode向NameNode上报所有的Block信息。

  3. 心跳3秒一次,用于确保每个NameNode和DataNode之间的通信连接正常。若NameNode超过10分钟+30秒没有收到DataNode的心跳,则认为该DataNode不可用。

参数设置:

<property>
    <name>dfs.blockreport.intervalMsec</name>
    <value>21600000</value>
    <description>Determines block reporting interval in milliseconds.</description>
</property>
<property>
    <name>dfs.datanode.directoryscan.interval</name>
    <value>21600s</value>
    <description>Interval in seconds for Datanode to scan its data directories and reconcile the difference between blocks in memory and on the disk.</description>
</property>

数据完整性

数据在上传到HDFS中时会在本地进行数据封装,给数据增加一个校验位(如crc校验位)。HDFS接收到数据时会根据校验位对数据进行检查。校验位如果与数据不匹配,则表明数据在上传过程中出现损坏。

掉线时限

如果DataNode与NameNode超过\(2\times \text{recheck_interval} + 10 \times \text{heart_beat_interval}=10\text{mins}\ 30\text{seconds}\),则NameNode与DataNode断开通讯,认为DataNode不可用。

参数设置:

<property>
    <name>dfs.datanode.heartbeat.recheck-interval</name>
    <value>300000</value>
    <description>millisecond.</description>
</property>
<property>
    <name>dfs.heartbeat.interval</name>
    <value>3</value>
    <description>second.</description>
</property>

HDFS的读写数据流程

客户端写数据的逻辑流程

  1. 客户端向NameNode请求上传目标文件。NameNode检查权限和目录结构,返回允许上传的响应。该过程需要创建DistributedFileSystem对象

  2. 客户端请求上传第一个Block(0-128MB)。NameNode元数据进行副本存储节点的选择,返回DataNode1, DataNode2, DataNode3这三个存储数据的节点。

  3. 客户端HDFS数据流向DataNode1传输文件,DataNode1向DataNode2传输文件,DataNode2向DataNode3传输文件。DataNode3向DataNode2响应传输成功,DataNode2向DataNode1响应传输成功,DataNode1向客户端响应传输成功。为提高并发性,DataNode1开始被写入文件的同时已经向DataNode2节点开始传输文件了。数据流会将目标文件缓存为一个个chunk(512B)+chunksum(4B),当所有chunk512B+chunksum4B足够大了之后它们会被封装成一个Packet(64KB),Packet是传输数据的最小单元。该过程需要创建FSDataOutputStream对象

客户端读数据的逻辑流程

  1. 客户端向NameNode请求下载文件。NameNode检查权限以及文件是否存在,返回目标文件的元数据(包含目标文件的存储目录、所在的Block)。该过程需要创建DistributedFileSystem对象

2.客户端HDFS数据流向节点距离最近的DataNode请求数据,若当前最近DataNode节点负载过大,则数据流会向别的DataNode请求数据。读取DataNode文件时采取串型读取方式,即读完目标文件的Block1后再读取Block2。该过程需要创建FSDataInputStream对象

网络拓扑节点距离计算

设有一台服务器,其结构如下:

\[ 服务器 \begin{cases} 集群d1 \begin{cases} 机架r1: \text{DataNode1}, \text{DataNode2}, \text{DataNode3}\\ 机架r2: \text{DataNode4}, \text{DataNode5}, \text{DataNode6}\\ 机架r3: \text{DataNode7}, \text{DataNode8}, \text{DataNode9} \end{cases}\\ 集群d2 \begin{cases} 机架r4: \text{DataNode10}, \text{DataNode11}, \text{DataNode12}\\ 机架r5: \text{DataNode13}, \text{DataNode14}, \text{DataNode15}\\ 机架r6: \text{DataNode16}, \text{DataNode17}, \text{DataNode18} \end{cases} \end{cases} \]

DataNode7和DataNode12的节点距离为:\(2+2+2=6\)

DataNode1和DataNode2的节点距离为:\(2\)

HDFS的默认副本存储方式:加入副本因子为\(3\),那么HDFS会在当前DataNode下存储一份副本,然后在随机另一个机架的随机DataNode中存储一份副本(出于安全性考虑),最后再在这个随机机架中选择一个随机DataNode存储一份副本(机架传输速度更快)。

MapReduce

MapReude概述

MapReduce不擅长实时计算,不能实现像MySQL那种毫秒级查询。

MapReduce不擅长流式计算,不能像Sparkstreaming和flink那样处理新出现的一条一条的数据。

MapReduce不擅长有向无环图计算(迭代式串联计算),而Spark的RDD适合做这样的计算。

Map阶段对原始数据进行切分,然后分布式计算,保存计算结果到磁盘分区。

Reduce阶段从磁盘分区中拿取数据并汇总,然后输出最终结果。

MapReduce进程:

  1. MrAppMaster:负责整个程序的过程调度和状态协调。

  2. MapTask:负责Map阶段的整个数据处理流程。

  3. ReduceTask:负责Reduce阶段的整个数据处理流程。